improve: use the default list methods with read-cache-after-write consistency#3336
improve: use the default list methods with read-cache-after-write consistency#3336csviri wants to merge 8 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR removes the “strong consistency” list/index variants by folding read-cache-after-write consistency behavior into the default list(...) / byIndexStream(...) methods on ManagedInformerEventSource, and updates tests to use the new API surface.
Changes:
- Replaced
listWithStrongConsistency(...)withlist(...)implementations that merge informer results with the temporary cache for read-cache-after-write consistency. - Replaced
byIndexStreamWithStrongConsistency(...)withbyIndexStream(...)and updatedbyIndex(...)to return merged results. - Updated informer event source tests to call the renamed/default methods.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java | Removes “strong consistency” variants, makes default list/index methods merge with the temp cache, and adjusts index lookup behavior. |
| operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java | Updates tests to use list(...) / byIndexStream(...) instead of the removed “strong consistency” methods. |
| @Override | ||
| public List<R> byIndex(String indexName, String indexKey) { | ||
| return mergeWithWithTempCacheResources( | ||
| manager().byIndexStream(indexName, indexKey), indexName, indexKey); | ||
| manager().byIndexStream(indexName, indexKey), indexName, indexKey) | ||
| .toList(); |
xstefank
left a comment
There was a problem hiding this comment.
Do we have a policy that public methods should be first deprecated before they are removed?
This is against |
| public Stream<R> list(String namespace, Predicate<R> predicate) { | ||
| return manager().list(namespace, predicate); | ||
| return mergeWithWithTempCacheResources( | ||
| manager().list(namespace, predicate), namespace, predicate); | ||
| } |
| @Override | ||
| public Stream<R> byIndexStream(String indexName, String indexKey) { | ||
| return mergeWithWithTempCacheResources( | ||
| manager().list(namespace, predicate), namespace, predicate); | ||
| } | ||
|
|
||
| /** | ||
| * Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when | ||
| * resources are updated using {@link | ||
| * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. | ||
| */ | ||
| public Stream<R> listWithStrongConsistency(Predicate<R> predicate) { | ||
| return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); | ||
| manager().byIndexStream(indexName, indexKey), indexName, indexKey); | ||
| } |
| /** | ||
| * {@inheritDoc} | ||
| * | ||
| * <p>This implementation is read-cache-after-write consistent. Results are merged with the | ||
| * temporary resource cache to ensure recently written resources are reflected in the output. | ||
| */ |
| public Stream<R> list(String namespace, Predicate<R> predicate) { | ||
| return manager().list(namespace, predicate); | ||
| return mergeWithWithTempCacheResources( | ||
| manager().list(namespace, predicate), namespace, predicate); | ||
| } | ||
|
|
||
| /** | ||
| * {@inheritDoc} | ||
| * | ||
| * <p>This implementation is read-cache-after-write consistent. Results are merged with the | ||
| * temporary resource cache to ensure recently written resources are reflected in the output. | ||
| */ | ||
| @Override | ||
| public Stream<R> list(Predicate<R> predicate) { | ||
| return cache.list(predicate); | ||
| } | ||
|
|
||
| @Override | ||
| public List<R> byIndex(String indexName, String indexKey) { | ||
| return manager().byIndex(indexName, indexKey); | ||
| } | ||
|
|
||
| public Stream<R> byIndexStream(String indexName, String indexKey) { | ||
| return manager().byIndexStream(indexName, indexKey); | ||
| return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); | ||
| } | ||
|
|
||
| /** | ||
| * Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is | ||
| * useful when resources are updated using {@link | ||
| * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. | ||
| * {@inheritDoc} | ||
| * | ||
| * <p>This implementation is read-cache-after-write consistent. Results are merged with the | ||
| * temporary resource cache to ensure recently written resources are reflected in the output. | ||
| */ | ||
| public Stream<R> listWithStrongConsistency(String namespace, Predicate<R> predicate) { | ||
| @Override | ||
| public Stream<R> byIndexStream(String indexName, String indexKey) { | ||
| return mergeWithWithTempCacheResources( | ||
| manager().list(namespace, predicate), namespace, predicate); | ||
| } | ||
|
|
||
| /** | ||
| * Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when | ||
| * resources are updated using {@link | ||
| * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. | ||
| */ | ||
| public Stream<R> listWithStrongConsistency(Predicate<R> predicate) { | ||
| return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); | ||
| manager().byIndexStream(indexName, indexKey), indexName, indexKey); | ||
| } |
| return mergeWithWithTempCacheResources( | ||
| manager().list(namespace, predicate), namespace, predicate); |
…sistency Remove the index and list related methods "strong consistency variant". The rational behind this change is that the user either uses read-cache-after-write consistency feature, than the "strong read" should be used. If on the other hand tha feature, is not used, the current implementation does not impose any overhead. In addition to that strong consistency is not a good name, we call this feature as read-cache-after-write consistency, that might be too long though. Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (2)
operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java:1
- In
keys(),manager()is invoked multiple times andmanager().contains(k)is called once per temp key, which can be costly and can also be inconsistent ifmanager()is not stable. Consider capturingvar m = manager();once and de-duplicating without per-key lookups (e.g., concatenate the two key streams and usedistinct(), or collect manager keys into aSetonce and then filter temp keys against that set). Tradeoff:distinct()/collecting requires buffering keys, but avoids N extracontains()calls.
/*
operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java:1
- If
InformerManagerprovides a non-streambyIndex(...)that is more efficient thanbyIndexStream(...)(as it was used previously), consider using it and streaming the returned list only for merging/filtering. This can reduce stream overhead and avoid multiple stream traversals depending on the manager implementation.
/*
| private static <R extends HasMetadata> Predicate<R> filterResourceByNamespaceAndPredicate( | ||
| String namespace, Predicate<R> predicate) { | ||
| return r -> { | ||
| if (namespace != null) { | ||
| var res = Optional.of(r).map(ns -> ns.equals(namespace)).orElse(false); | ||
| if (!res) return false; | ||
| } | ||
| // we check if the ghost resource is part of the index | ||
| tempResourceStream = | ||
| tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)); | ||
| } else { | ||
| tempResourceStream = tempResources.values().stream(); | ||
| } | ||
| return Stream.concat(tempResourceStream, upToDateList.stream()); | ||
| if (predicate != null) { | ||
| return predicate.test(r); | ||
| } | ||
| return true; | ||
| }; | ||
| } |
| public Stream<R> list(String namespace, Predicate<R> predicate) { | ||
| return manager().list(namespace, predicate); | ||
| return mergeWithTempCacheForList(manager().list(namespace, predicate), namespace, predicate); | ||
| } |
| private Stream<R> mergeWithTempCacheForList( | ||
| Stream<R> stream, String namespace, Predicate<R> predicate) { | ||
| if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { | ||
| return stream.filter(filterResourceByNamespaceAndPredicate(namespace, predicate)); | ||
| } |
| .filter(filterResourceByNamespaceAndPredicate(namespace, predicate)) | ||
| .toList(); |
Remove the index and list related methods "strong consistency variant".
The rational behind this change is that the user either uses read-cache-after-write consistency feature,
than the "strong read" should be used. If on the other hand tha feature,
is not used, the current implementation does not impose any overhead.
In addition to that strong consistency is not a good name, we call this feature
as read-cache-after-write consistency, that might be too long though.
Adds ghost resource handling for
keys().Add also missing javadocs.
Signed-off-by: Attila Mészáros a_meszaros@apple.com